package com.sxt.service.impl;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.dubbo.config.annotation.Reference;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.beans.DocumentObjectBinder;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import com.sxt.model.ProdSolrDto;
import com.sxt.model.ShopCartItem;
import com.sxt.service.ImportProdService;
import com.sxt.service.ProdService;

import lombok.extern.slf4j.Slf4j;

@Service // 导入不需要对外提供服务
@Slf4j
public class ImportServiceImpl implements ImportProdService,MessageListener {

	/**
	 * search 既是服务的提供者,也是服务的消费者
	 */
	@Reference(check = false)
	private ProdService prodService ;

	@Autowired
	private SolrClient  solrClient ;// 集群版和单机版都可以使用

	private Date t1 = new Date() ;

	/**
	 * 1 我们线程池里面默认为4 个线程
	 * 2 我们的总页数 10页
	 *  每一页交个一个线程 
	 *   第一轮 10-> pool-> 4线程->4 页没有了->6 页
	 * 第二轮(假设4个同时导入完毕)6-4 = 2
	 * 第三轮 2 - 4 = 0
	 */
	public static ExecutorService threadPool = Executors.newFixedThreadPool(4); //cpucore *2
	// 4->8 核心线程用完了,队列里面满了,
	//			new ThreadPoolExecutor(4, 8, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());

	@PostConstruct
	public void init() {
		importAllProd();
	}
	/**
	 * 项目一启动,导入所有
	 * 1 查询数据库,准备数据
	 * 2 将数据导入导入solr 里面
	 * 
	 */
	@Override
	public void importAllProd() {
		try {
			solrClient.deleteByQuery("*:*");
			solrClient.commit() ;
		} catch (SolrServerException | IOException e1) {
			System.out.println("清除失败了");
			e1.printStackTrace();
		}
		int size = 20 ;
		int totalCount = prodService.countProd(null,null);
		int totalPage = totalCount % size ==0  ? (totalCount/size) : (totalCount/size+1) ;
		if(totalPage==0) {
			return ;
		}
		// 查询一页数据,然后导入,但是现在若数据量很大的情况,它的导入速度快,但是数据巨大 我们需要多线程导入
		CountDownLatch countDownLatch = new CountDownLatch(totalPage) ;// 总的任务数,当总的任务数,执行完毕后,我主线程执行后面的操作
		for (int i = 1; i <= totalPage; i++) { // 主线程
			final int currentPage = i ;
			threadPool.submit(new Runnable() {
				@Override
				public void run() {
					List<ProdSolrDto> listProd = prodService.listProd(currentPage, size,null,null) ; // 子线程
					import2Solr(listProd) ;
					System.out.println("第"+currentPage+"导入完毕");
					countDownLatch.countDown(); // 任务执行完毕后,自动-
				}
			});
		}
		// 主线程将任务放在pool 里面, 里面往下走了
		// 怎么保证子线程执行完毕后,多线程往下走
		try {
			//			countDownLatch.await(); // 死等
			countDownLatch.await(300, TimeUnit.SECONDS) ;  //等待我们的计数==0 的时,我往下走
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}


	/**
	 * 将这些数据一次性导入到solr 里面
	 * @param listProd
	 */
	protected void import2Solr(List<ProdSolrDto> listProd) {
		if(listProd==null || listProd.isEmpty()) {
			return ;
		}
		List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(listProd.size());
		for (ProdSolrDto prodSolrDto : listProd) {
			DocumentObjectBinder documentObjectBinder = new DocumentObjectBinder();
			// 可以直接prodSolrDto 里面的属性和solrInputDocument 属性之间完成对应,但是要添加注解
			SolrInputDocument solrInputDocument = documentObjectBinder.toSolrInputDocument(prodSolrDto);
			String priceStr = solrInputDocument.getFieldValue("price").toString();
			Double price = Double.valueOf(priceStr);
			//			solrInputDocument.addField(name, value); // 追加
			solrInputDocument.setField("price", price); // 覆盖

			String positiveRatingStr = solrInputDocument.getFieldValue("positive_rating").toString();
			Double positiveRating = Double.valueOf(positiveRatingStr);
			//			solrInputDocument.addField(name, value); // 追加
			solrInputDocument.setField("positive_rating", positiveRating); // 覆盖
			// 必须是类型对应才能转化(java 里面的类型,和solr 里面的类型完全对才行)
			docs.add(solrInputDocument);
		}
		try {
			solrClient.add(docs); // 导入
			solrClient.commit() ;// 提交事务
		} catch (Exception e) {
			log.info("导入失败,原因为{}",e.getMessage());
			e.printStackTrace();
		} 
	}

	/**
	 * 0 0 10,14,16 * * ? 每天上午10点，下午2点，4点
0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时
0 0 12 ? * WED 表示每个星期三中午12点
"0 0 12 * * ?" 每天中午12点触发
"0 15 10 ? * *" 每天上午10:15触发
"0 15 10 * * ?" 每天上午10:15触发
"0 15 10 * * ? *" 每天上午10:15触发
"0 15 10 * * ? 2005" 2005年的每天上午10:15触发
"0 * 14 * * ?" 在每天下午2点到下午2:59期间的每1分钟触发
"0 0/5 14 * * ?" 在每天下午2点到下午2:55期间的每5分钟触发
"0 0/5 14,18 * * ?" 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
"0 0-5 14 * * ?" 在每天下午2点到下午2:05期间的每1分钟触发
"0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44触发
"0 15 10 ? * MON-FRI" 周一至周五的上午10:15触发
"0 15 10 15 * ?" 每月15日上午10:15触发
"0 15 10 L * ?" 每月最后一日的上午10:15触发
"0 15 10 ? * 6L" 每月的最后一个星期五上午10:15触发
"0 15 10 ? * 6L 2002-2005" 2002年至2005年的每月的最后一个星期五上午10:15触发
"0 15 10 ? * 6#3" 每月的第三个星期五上午10:15触发
	 */
	/**
	 * 实现增量的导入
	 */
	//    @Scheduled(cron = "0 0 3 * * ?")// 每天的3 点导入一次
	@Scheduled(initialDelay = 120*1000,fixedRate =120*1000 ) // 2分钟导入一次
	public void importIncrementProd() {
		Date t2 = new Date();
		int increProdDataCount = prodService.countProd(t1, t2);
		int size = 20;
		int totalPage = increProdDataCount % size  ==0  ? (increProdDataCount/size) : (increProdDataCount/size+1) ;
		if(totalPage==0) {
			return ;
		}
		// 查询一页数据,然后导入,但是现在若数据量很大的情况,它的导入速度快,但是数据巨大 我们需要多线程导入
		CountDownLatch countDownLatch = new CountDownLatch(totalPage) ;// 总的任务数,当总的任务数,执行完毕后,我主线程执行后面的操作
		for (int i = 1; i <= totalPage; i++) { // 主线程
			final int currentPage = i ;
			threadPool.submit(new Runnable() {
				@Override
				public void run() {
					List<ProdSolrDto> listProd = prodService.listProd(currentPage, size,t1,t2) ; // 子线程
					import2Solr(listProd) ;
					System.out.println("第"+currentPage+"导入完毕");
					countDownLatch.countDown(); // 任务执行完毕后,自动-
				}
			});
		}
		try {
			countDownLatch.await(300, TimeUnit.SECONDS) ;
			t1 = t2 ; // 更换时间
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	@JmsListener(destination = "order.decr.stock")
	public void onMessage(Message message) {
		ObjectMessage objMsg = (ObjectMessage) message ; 
		try {
			List<ShopCartItem> cartItems = (List<ShopCartItem>) objMsg.getObject();
			importListOrderItem(cartItems);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void importListOrderItem(List<ShopCartItem> cartItems) {
		if(cartItems==null || cartItems.isEmpty()) {
			return ;
		}
		for (ShopCartItem shopCartItem : cartItems) {
			Long prodId = shopCartItem.getProdId(); // 商品的id
			Integer prodCount = shopCartItem.getProdCount(); // 商品的数量
			// 这个导入数据量很少，我们需要先查询solr ，在执行导入
			SolrQuery solrQuery = new SolrQuery("id:"+prodId);
			try {
				QueryResponse queryResponse = solrClient.query(solrQuery);
				SolrDocumentList results = queryResponse.getResults();
				if(results.isEmpty()) {
					log.info("在搜索引擎里面，没有发现id为{}的商品",prodId);
				}
				
				SolrDocument solrDocument = results.get(0); // 更新销量的数据
				Object fieldValue = solrDocument.getFieldValue("sold_num"); //
//				solrDocument.setField(name, value); // 覆盖值
//				Collection<Object> fieldValues = solrDocument.getFieldValues("sold_num");
//				solrDocument.addField(name, value); // 其实在一个字段里面，可以有多个值，该值可以是个集合，使用add 方法可以让它有多个值，但是实际上，我们使用的是最后一个值
				Integer soldNum = Integer.valueOf(fieldValue.toString());
				soldNum += prodCount ;
				solrDocument.setField("sold_num", soldNum); 
				SolrInputDocument  solrInput = solrDocument2SolrInputDocument(solrDocument);
				//若你的version 和solr 里面的version 是相同的，它做的局部的更新，若没有version ，它就会做一个新增的操作
				// 局部的更新：会先对比solr 字段和我本次字段的是否相同，若相同，不更新，不相同，更新
				solrClient.add(solrInput) ;// 这个导入不同，它是局部的更新，并不好影响多个索引的结构
				solrClient.commit() ; 
			} catch (SolrServerException | IOException e) {
				e.printStackTrace();
			}
		}
		
	}
	private SolrInputDocument solrDocument2SolrInputDocument(SolrDocument solrDocument) {
		Iterator<Entry<String, Object>> iterator = solrDocument.iterator();
		SolrInputDocument solrInputDocument = new SolrInputDocument();
		while(iterator.hasNext()) {
			Entry<String, Object> next = iterator.next(); // 字段的名称  // 字段的值
			String fieldName = next.getKey();
			Object fieldValue = next.getValue();
			solrInputDocument.setField(fieldName, fieldValue);
		}
		return solrInputDocument;
	}

}
